30 jun. 2012

Tutorial de ZeroMQ con Python( Pipelining). Parte 7.

Siguiendo con los ejemplos sobre ZeroMQ ahora mostraré como usar diferentes tipos de transporte (antes se mostrada a tcp, ahora se usará tcp e ipc el cual es un modelo de comunicación interprocesos y puede ser de ayuda cuando se necesita baja latencia).

Luego de escoger la capa de transporte se selecciona un patrón de mensaje, en artículos anteriores se ha mostrado los patrones REQ/REP, PUB/SUB y Pair, sólo queda por explicar su funcionamiento el patrón UPSTREAM/DOWNSTREAM.

El patrón UPSTREAM/DOWNSTREAM se parece al patrón REQ/REP, la diferencia es que en REQ/REP puede existir comunicación bidireccional en cambio con UPSTREAM/DOWNSTREAM se tiene una comunicación en un sólo sentido y permite paralelizar procesos. En la figura se muestra el flujo de trabajo.
Un ejemplo de su posible uso tomado del artículo de Nicholas Piël es que se tenga un sistema de reconocimiento de imágenes en tiempo real, el servidor captura las imágenes y se las envía a los workers para que sean procesadas, y al terminar dicho proceso se envían las imágenes al colector.

El servidor simplemente tiene 2 listas una de cadenas de texto que al final envía 2 "x" para que los clientes finalicen. Se crea un socket con capa de transporte tcp para envíar la lista de números, luego se crea el socket para ipc y se envía las cadenas de texto.

Se notará que los clientes simplemente se conectan al servidor, uno de los clientes se conecta por tcp y el otro por ipc.

El código del servidor se muestra a continuación:

#!/usr/bin/env python

#Se importa zeroMQ y sleep de time

import zmq

from time import sleep



#Se crea la instancia del contexto

ctx = zmq.Context()

#Se crea el socket con parametro DOWNSTREAM

socket = ctx.socket(zmq.DOWNSTREAM)

#Se crea una lista de textos

lista1 = ("Esta es una prueba,1","Desde zeroMQ,2","Sigo probando,3","Prueba...,4","q,5","1,6","q,7","x","x")

#Se crea una lista de un rango de numeros

lista2 = range(1,4)

#Se asocia el socket a una IP y puerto

socket.bind("tcp://127.0.0.1:12345")



#Se envia la lista de numeros a los clientes

#que se conectan al servidor.

#El envio se hace cada segundo.

for i in lista2:

 socket.send("%s" %i)

 print "Enviado %s" %i

 sleep(1)



#Se asocia el socket a un archivo por

#medio del transporte ipc

socket.bind("ipc:///tmp/zmqdemo")

#Se envia la lista de textos a los clientes conectados

#el envio se realiza cada segundo.

for i in lista1:

 socket.send(i)

 print "Enviado %s" %i

 sleep(1)


El código del cliente que usa tcp como transporte se muestra a continuación:


#!/usr/bin/env python

#Se importa el modulo zeroMQ

import zmq

#Se crea la instancia del contexto

ctx = zmq.Context()

#Se crea el socket con Parametro UPSTREAM

socket = ctx.socket(zmq.UPSTREAM)

#Se conecta el socket (tcp) al servidor y su puerto

socket.connect("tcp://127.0.0.1:12345")

#Se crea un ciclo que recibe lo que envia el 

#servidor y lo muestra en pantalla

#y se recibe la letra x finaliza la conexion del cliente

while True:

 rcv =  socket.recv()

 if rcv == "x" :

  print rcv

  break

 print rcv 

Se muestra el código del cliente utilizando como transporte ipc:

#!/usr/bin/env python

#Se importa el modulo zeroMQ

import zmq

#Se crea la instancia del contexto

ctx = zmq.Context()

#Se crea el socket con parametro UPSTREAM

socket = ctx.socket(zmq.UPSTREAM)




#Se conecta el socket a un archivo temporal

#por medio de ipc como transporte

socket.connect("ipc:///tmp/zmqdemo")

#Se crea un ciclo para recibir

#la informacion del servidor

#se muestra en pantalla 

#si se recibe la letra x 

#finaliza la conexion con el servidor

while True:

 rcv = socket.recv()

 if rcv ==  'x':

  print rcv

  break

 print rcv

La siguientes figuras muestran el resultado en el servidor y ambos clientes:
Servidor:

Cliente (tcp):
Cliente (ipc):

Como se nota en las figuras de los clientes, la información que envía el servidor a los clientes se distribuye entre ambos.


3 jun. 2012

Tutorial de ZeroMQ con Python( PUSH/PULL). Parte 6.

En este artículo se explica el funcionamiento de PUSH y PULL.

La idea es que se tiene el emisor, quien envía a los "workers" unos string que son procesados de distinta manera o de igual manera (una forma de crear escalabilidad). Luego que los "workers" procesan la información recibida del emisor se la pasan a resultado quien presenta la información en pantalla.

La figura muestra el proceso ya explicado:
El código del emisor es el siguiente:

#!/usr/bin/python

#Se importa zeroMQ y random

import zmq

import random

#Se crea la instancia del contexto

context = zmq.Context()

#Se crea el socket con el argumento PUSH

envio =context.socket(zmq.PUSH)

#Se asocia el socket a escuchar todas las IPs y el puerto 5557

envio.bind("tcp://*:5557")




#se muestra que es necesario esperar que arranquen los workers

print "Hay que esperar que los workers se inicien"

#Al dar enter se inicia el proceso de transmision

raw_input()

print "Se inicia la transmision del trabajo..."

#tupla de strings que se van a enviar

cadenas = ['hola', 'aloha','hello','buenas noches','buenas tardes','buenos dias','bienvenido']




#Se crea un ciclo para recorrer la tupla

for i in range(len(cadenas)):

 cadena = cadenas[i]

 envio.send(cadena)

 print "Enviando: {0}".format(cadena)

El código del worker es el siguiente:

#!/usr/bin/python

#Se importa ZeroMQ y sleep de time

import zmq

from time import sleep




#Se crea la instancia del contexto

context = zmq.Context()

#Se define el Socket con argumento PULL

recepcion = context.socket(zmq.PULL)

#Se conecta el socket a localhost puerto 5557

#Es el puerto donde origen envia con PUSH los datos

recepcion.connect("tcp://localhost:5557")




#Se crea el socket de envio de los datos procesados con argumento PUSH

envio = context.socket(zmq.PUSH)

#Se conecta el socket a localhost y puerto 5558

envio.connect("tcp://localhost:5558")




#Se genera un ciclo

#donde se recive lo transmitido por origen

#se procesa (se coloca en mayusculas)

#se muestra en pantalla y se envia.

#los ciclos tienen un retardo de 1 seg

while True:

 cadena = recepcion.recv()

 print "Proceso:{0}".format(cadena)

 envio.send(cadena.upper())

 sleep(1)

El código de resultado es el siguiente:

#!/usr/bin/python




#Se importa ZeroMQ

import zmq

#Se crea la instancia del contexto

context = zmq.Context()




#Se crea el socket PULL que recibe los mensajes de los workers

recepcion = context.socket(zmq.PULL)

#Se asocia el socket a escuchar todas las IPs en el puerto 5558

#el puerto donde los workers envian los mensajes

recepcion.bind("tcp://*:5558")

#Se inicia un ciclo donde se recibe los mensajes

#de los workers y se muestra en pantalla

while True:

 mensaje = recepcion.recv()

 print "Recibo: {0}".format(mensaje)


Se tiene que iniciar los scripts resultado, worker (worker1 y una copia identica llamada worker2) y origen.

La siguiente figura muestra el resultado de origen:


La siguiente figura muestra el resultado del worker1 y worker2:

Se nota que los workers se distribuyen el trabajo de forma equitativa.

En la siguiente figura se muestra lo que despliega en pantalla resultado:

Como se ve, los workers procesan las cadenas de texto que reciben de origen, las pasan a mayúsculas y se la envían a resultado para que las muestre en pantalla.

2 jun. 2012

Tutorial de ZeroMQ con Python( REP/REQ y PUB/SUB). Parte 5.

En la revista Linux Magazine versión en español publicaron un artículo sobre ZeroMQ con Python.

Explican el uso de 2 patrones en conjunto, REP/REQ y PUB/SUB. Colocan como ejemplo una simulación del proceso de envío de tweets de twitter.

La siguiente figura muestra la simulación:

El emisor genera los mensajes, el tuiter los recibe y los publica, luego cada receptor se suscribe al tuiter y recibe la información que le interesa.

EL código del emisor es el siguiente:


#!/usr/bin/python




#Se importa zeroMQ

import zmq

#Se crea el contexto

context = zmq.Context()

#Se crea el socket con el parametro REQ

socket = context.socket(zmq.REQ)

#Se asocia la IP y el puerto del socket.

socket.connect("tcp://127.0.0.1:4000")




#Se genera los mensajes estilo tuiter y se envia al socket.

for i in ['@_seraph1 Esta es una prueba','@otro viendo el juego', '@_seraph1 otra prueba','@otro otro']:

 socket.send(i)

 msg_in = socket.recv()

El código del tuiter es el siguiente:

#!/usr/bin/env python

#Se importa zeroMQ

import zmq

#Se importa choice de random

from random import choice

#Se crea el contexto

context = zmq.Context()

#Se define el socket de recepcion con argumento REP

socket_recv = context.socket(zmq.REP)

#Se asocia a una IP y puerto el socket de recepcion

socket_recv.bind("tcp://127.0.0.1:4000")




#Se define el socket de publicacion con argumento PUB

socket = context.socket(zmq.PUB)

#Se asocia la ip y un puerto distinto al anterio socket

socket.bind("tcp://127.0.0.1:5000")




#Se crea un ciclo

while True:

 #Se recibe el mensaje del socket de recepcion

 msg = socket_recv.recv()

 #Se envia el mensaje de recepcion

 socket_recv.send(msg)

 #Se muestra el mensaje en pantalla

 print "Reenvio: {0}".format(msg)

 #Se envia  el mensaje al socket de publicacion

 socket.send(msg)

Se muestra ahora el código de los receptores:
Receptor 1:

#!/usr/bin/python

#Se importa zeroMQ

import zmq

#Se crea el contexto

context = zmq.Context()

#Se crea el socket de suscripcion

socket = context.socket(zmq.SUB)

#Se asocia ese socket a la IP y puerto donde publica tuiter

socket.connect("tcp://127.0.0.1:5000")

#Se suscribe a escuchar los mensajes de @_seraph1

socket.setsockopt(zmq.SUBSCRIBE, "@_seraph1")

#se crea un ciclo donde se recibe los mensajes

while True:

 print "->",socket.recv()

Receptor 2:


#!/usr/bin/python

#Se importa zeroMQ

import zmq

#Se crea el contexto

context = zmq.Context()

#Se crea el socket de suscripcion

socket = context.socket(zmq.SUB)

#Se asocia ese socket a la IP y puerto donde publica tuiter

socket.connect("tcp://127.0.0.1:5000")

#Se suscribe a escuchar los mensajes de @otro

socket.setsockopt(zmq.SUBSCRIBE, "@otro")

#se crea un ciclo donde se recibe los mensajes

while True:

 print "->",socket.recv()




En la figura se muestra el resultado de tuiter:

En la siguiente figura se muestra el resultado del receptor 1:
Y por último el receptor 2:
En la versión de la revista no publicaron el código de los receptores.
Esto muestra el nivel de complejidad en sistema de colas que se puede lograr con ZeroMQ.

1 jun. 2012

Tutorial de ZeroMQ con Python(PUB/SUB). Parte 4.

Continuando con los tutoriales sobre ZeroMQ ahora se muestra como usar Publicar/Suscribir (Publish/subscribe).

En el patrón pub/sub los componentes son pobremente acompladas, será de gran ayuda para escalar ya que no hay necesidad de preocuparse por los suscriptores.  Sin embargo, este acoplamiento puede conducir a un comportamiento inesperado cuando no se entienden completamente.

El ejemplo que se va a desarrollar se basa en la publicación sobre la introducción de ZeroMQ de Nicholas Piël.

El servidor genera una serie de mensaje de países y de eventos por país, la idea es publicar de forma aleatoria un país y un evento. Se creará 2 clientes donde cada uno mostrará la información de 2 países, eso es gracias a PUB/SUB ya que el suscriptor sólo recibe la información que necesita.

A continuación se muestra el código del servidor:

#!/usr/bin/env python #Se importa ZeroMQ import zmq #Se importa choice a partir de random from random import choice #Se crea la instancia del contexto context = zmq.Context() #Se crea el socket pasandole argumento de publicacion PUB socket = context.socket(zmq.PUB) #Se asocia la IP y el puerto que va a escuchar. socket.bind("tcp://127.0.0.1:5000") #Se importa sleep from time import sleep #Se crea una lista de paises y de eventos paises = ['holanda','brasil','alemania','portugal','argentina','italia','rusia','venezuela'] eventos = ['tarjeta amarilla','tarjeta roja','gol','corner','falta'] #Se crea un contador con valor inicial 1 c = 1 #Se crea un ciclo indefinido while True:     #Se define un mensaje pasando de forma aleatoria un pais y un evento     mensaje = choice( paises) + " " + choice(eventos)     #Se muestra en pantalla el valor del contador y el mensaje.     print "->",c , mensaje     #Se envia el mensaje     socket.send(mensaje)     #Se genera un retardo de 1 seg     sleep(1)     #Se incrementa el contador     c += 1     #Si se llega a 180 se termina el ciclo si no continua.     if c == 180:         break     else:         continue
A continuación el código de los 2 clientes(suscriptores):
Cliente1:

#!/usr/bin/env python

#Se importa zeroMQ
import zmq
#Se importa sleep a partir de time
from time import sleep 
#Se crea la instancia del contexto de zeroMQ
context = zmq.Context()
#Se crea el socket del suscriptor SUB
socket = context.socket(zmq.SUB)
#Se crea la conexion a la IP y puerto del servidor
socket.connect("tcp://127.0.0.1:5000")
#Se define una opcion del socket del suscriptor con argentina y venezuela
socket.setsockopt(zmq.SUBSCRIBE, "argentina")
socket.setsockopt(zmq.SUBSCRIBE, "venezuela")
#Se define el valor inicial de un contador
c = 1 
#Se crea un ciclo indefinido
while True:
    #Se muestra en pantalla el valor del contador y el mensaje recibido
    print  c, "->",socket.recv()
    #Se genera un retardo de 1 seg en cada ciclo
    sleep(1)
    #Se incrementa el contador en 1
    c += 1
    #Si el contador llega a 90 se termina el ciclo, si no continua
    if c == 90:
        break
    else:
        continue

Cliente 2:

#!/usr/bin/env python

#Se importa zeroMQ
import zmq
#Se importa sleep a partir de time
from time import sleep 
#Se crea la instancia del contexto de zeroMQ
context = zmq.Context()
#Se crea el socket del suscriptor SUB
socket = context.socket(zmq.SUB)
#Se crea la conexion a la IP y puerto del servidor
socket.connect("tcp://127.0.0.1:5000")
#Se define una opcion del socket del suscriptor con brasil y alemania
socket.setsockopt(zmq.SUBSCRIBE, "brasil")
socket.setsockopt(zmq.SUBSCRIBE, "alemania")
#Se define el valor inicial de un contador
c = 1 
#Se crea un ciclo indefinido
while True:
    #Se muestra en pantalla el valor del contador y el mensaje recibido
    print  c, "->",socket.recv()
    #Se genera un retardo de 1 seg en cada ciclo
    sleep(1)
    #Se incrementa el contador en 1
    c += 1
    #Si el contador llega a 90 se termina el ciclo, si no continua
    if c == 90:
        break
    else:
        continue



Es como si el servidor estuviera narrando varios juegos de futbol y los clientes sólo muestran la información del partido de futbol que les interesa.

La siguiente figura muestra el resultado del servidor:

A continuación se muestra las 2 fíguras de los clientes:
Cliente 1 (Venezuela, Argentina):

Cliente 2(Brasil,Alemania):

Ya se va mostrando cosas más complicadas que se pueden hacer con ZeroMQ, y todavía faltan...